Architecture

We talked about spark jobs in chapter 3. In this chapter, we will talk about the architecture and how master, worker, driver and executors are coordinated to finish a job.

Feel free to skip code if you prefer diagrams.

Deployment diagram

We have seen the following diagram inoverviewchapter.

Next, we will talk about some details about it.

Job submission

The diagram below illustrates how driver program (on master node) produces job, and then submits it to worker nodes.

Driver side behavior is equivalent to the code below:

finalRDD.action()

=
>
 sc.runJob()


//
 generate job, stages and tasks
=
>
 dagScheduler.runJob()

=
>
 dagScheduler.submitJob()

=
>
   dagSchedulerEventProcessActor 
!
JobSubmitted
=
>
 dagSchedulerEventProcessActor.
JobSubmitted
()

=
>
 dagScheduler.handleJobSubmitted()

=
>
 finalStage 
=
 newStage()

=
>
   mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size)

=
>
 dagScheduler.submitStage()

=
>
   missingStages 
=
 dagScheduler.getMissingParentStages()

=
>
 dagScheduler.subMissingTasks(readyStage)


//
 add tasks to the taskScheduler
=
>
 taskScheduler.submitTasks(
new
TaskSet
(tasks))

=
>
 fifoSchedulableBuilder.addTaskSetManager(taskSet)


//
 send tasks
=
>
 sparkDeploySchedulerBackend.reviveOffers()

=
>
 driverActor 
!
ReviveOffers
=
>
 sparkDeploySchedulerBackend.makeOffers()

=
>
 sparkDeploySchedulerBackend.launchTasks()

=
>
 foreach task

CoarseGrainedExecutorBackend
(executorId) 
!
LaunchTask
(serializedTask)

Explanation:

When the following code is evaluated, the program will launch a bunch of driver communications, e.g. job's executors, threads, actors, etc.

val
sc
=
new
SparkContext
(sparkConf)

This line defines the role of driver

Job logical plan

transformation()in driver program builds a computing chain (a series ofRDD). In eachRDD:

  • compute() function defines the computation of records for its partitions
  • getDependencies() function defines the dependency relationship across RDD partitions.

Job physical plan

Eachaction()triggers a job:

  • During dagScheduler.runJob() , different stages are defined
  • During submitStage() , ResultTasks and ShuffleMapTasks needed by the stage are produced, then they are packaged in TaskSet and sent to TaskScheduler . If TaskSet can be executed, tasks will be submitted to sparkDeploySchedulerBackend which will distribute tasks.

Task distribution

AftersparkDeploySchedulerBackendgetsTaskSet, theDriver Actorsends serialized tasks toCoarseGrainedExecutorBackend Actoron worker node.

Job reception

After receiving tasks, worker will do the following things:

coarseGrainedExecutorBackend 
!
LaunchTask
(serializedTask)

=
>
 executor.launchTask()

=
>
 executor.threadPool.execute(
new
TaskRunner
(taskId, serializedTask))

Executor packages each task intotaskRunner, and picks a free thread to run the task. ACoarseGrainedExecutorBackendprocess has exactly one executor

Task execution

The diagram below shows the execution of a task received by worker node and how driver processes task results.

After receiving a serialized task, the executor deserializes it into a normal task, and then runs the task to getdirectResultwhich will be sent back to driver. It is noteworthy that data package sent fromActorcan not be too big:

  • If the result is too big (e.g. the one of groupByKey ), it will be persisted to "memory + hard disk" and managed by blockManager . Driver will only get indirectResult containing the storage location. When result is needed, driver will fetch it via HTTP.
  • If the result is not too big (less than spark.akka.frameSize = 10MB ), then it will be directly sent to driver.

Some more details aboutblockManager:

WhendirectResult > akka.frameSize, thememoryStoreofBlockManagercreates aLinkedHashMapto hold the data stored in memory whose size should be less thanRuntime.getRuntime.maxMemory * spark.storage.memoryFraction(default 0.6). IfLinkedHashMaphas no space to save the incoming data, these data will be sent todiskStorewhich persists data to hard disk if the datastorageLevelcontains "disk"

In
TaskRunner
.run()

//
 deserialize task, run it and then send the result to 
=
>
 coarseGrainedExecutorBackend.statusUpdate()

=
>
 task 
=
 ser.deserialize(serializedTask)

=
>
 value 
=
 task.run(taskId)

=
>
 directResult 
=
new
DirectTaskResult
(ser.serialize(value))

=
>
if
( directResult.size() 
>
 akkaFrameSize() ) 
       indirectResult 
=
 blockManager.putBytes(taskId, directResult, 
MEMORY
+
DISK
+
SER
)

else
return
 directResult

=
>
 coarseGrainedExecutorBackend.statusUpdate(result)

=
>
 driver 
!
StatusUpdate
(executorId, taskId, result)

The results produced byShuffleMapTaskandResultTaskare different.

  • ShuffleMapTaskproducesMapStatuscontaining 2 parts:

    • the BlockManagerId of the task's BlockManager : (executorId + host, port, nettyPort)
    • the size of each output FileSegment of a task
  • ResultTaskproduces the execution result of the specifiedfunctionon one partition e.g. Thefunctionofcount()is simply for counting the number of records in a partition. SinceShuffleMapTaskneedsFileSegmentfor writing to disk,OutputStreamwriters are needed. These writers are produced and managed byblockMangerofshuffleBlockManager

In
 task.run(taskId)

//
 if the task is ShuffleMapTask
=
>
 shuffleMapTask.runTask(context)

=
>
 shuffleWriterGroup 
=
 shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits)

=
>
 shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context))

=
>
return
MapStatus
(blockManager.blockManagerId, 
Array
[compressedSize(fileSegment)])


//
If the task is ResultTask
=
>
return
 func(context, rdd.iterator(split, context))

A series of operations will be executed after driver gets a task's result:

TaskSchedulerwill be notified that the task is finished, and its result will be processed:

  • If it is indirectResult , BlockManager.getRemotedBytes() will be invoked to fetch actual results.
    • If it is ResultTask , ResultHandler() invokes driver side computation on result (e.g. count() take sum operation on all ResultTask).
    • If it is MapStatus of ShuffleMapTask , then MapStatus will be put into mapStatuses of mapOutputTrackerMaster , which makes it more easy to be queried during reduce shuffle.
  • If the received task on driver is the last task in the stage, then next stage will be submitted. If the stage is already the last one, dagScheduler will be informed that the job is finished.
After
 driver receives 
StatusUpdate
(result)

=
>
 taskScheduler.statusUpdate(taskId, state, result.value)

=
>
 taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)

=
>
if
 result is 
IndirectResult

      serializedTaskResult 
=
 blockManager.getRemoteBytes(
IndirectResult
.blockId)

=
>
 scheduler.handleSuccessfulTask(taskSetManager, tid, result)

=
>
 taskSetManager.handleSuccessfulTask(tid, taskResult)

=
>
 dagScheduler.taskEnded(result.value, result.accumUpdates)

=
>
 dagSchedulerEventProcessActor 
!
CompletionEvent
(result, accumUpdates)

=
>
 dagScheduler.handleTaskCompletion(completion)

=
>
Accumulators
.add(event.accumUpdates)


//
 If the finished task is ResultTask
=
>
if
 (job.numFinished 
==
 job.numPartitions) 
      listenerBus.post(
SparkListenerJobEnd
(job.jobId, 
JobSucceeded
))

=
>
 job.listener.taskSucceeded(outputId, result)

=
>
    jobWaiter.taskSucceeded(index, result)

=
>
    resultHandler(index, result)


//
 If the finished task is ShuffleMapTask
=
>
 stage.addOutputLoc(smt.partitionId, status)

=
>
if
 (all tasks in current stage have finished)
      mapOutputTrackerMaster.registerMapOutputs(shuffleId, 
Array
[
MapStatus
])
      mapStatuses.put(shuffleId, 
Array
[
MapStatus
]() 
++
 statuses)

=
>
 submitStage(stage)

Shuffle read

In the preceding paragraph, we talked about task execution and result processing, now we will talk about how reducer (tasks needs shuffle) gets the input data. The shuffle read part in last chapter has already talked about how reducer processes input data.

How does reducer know where to fetch data ?

Reducer needs to know on which node theFileSegmentsproduced byShuffleMapTaskof parent stage are.This kind of information is sent to driver’smapOutputTrackerMasterwhenShuffleMapTaskis finished. The information is also stored inmapStatuses: HashMap[stageId, Array[MapStatus]]. GivenstageId, we can getArray[MapStatus]which contains information aboutFileSegmentsproduced byShuffleMapTasks.Array(taskId)contains the location(blockManagerId) and the size of eachFileSegment.

When reducer need fetch input data, it will first invokeblockStoreShuffleFetcherto get input data’s location (FileSegments).blockStoreShuffleFetchercalls localMapOutputTrackerWorkerto do the work.MapOutputTrackerWorkerusesmapOutputTrackerMasterActorRefto communicate withmapOutputTrackerMasterActorin order to getMapStatus.blockStoreShuffleFetcherprocessesMapStatusand finds out where reducer should fetchFileSegmentinformation, and then it stores this information inblocksByAddress.blockStoreShuffleFetchertellsbasicBlockFetcherIteratorto fetchFileSegmentdata.

rdd.iterator()

=
>
 rdd(e.g., 
ShuffledRDD
/
CoGroupedRDD
).compute()

=
>
SparkEnv
.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser)

=
>
 blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer)

=
>
 statuses 
=
MapOutputTrackerWorker
.getServerStatuses(shuffleId, reduceId)


=
>
blocksByAddress
: 
Seq
[(
BlockManagerId
, 
Seq
[(
BlockId
, 
Long
)])] 
=
 compute(statuses)

=
>
 basicBlockFetcherIterator 
=
 blockManager.getMultiple(blocksByAddress, serializer)

=
>
 itr 
=
 basicBlockFetcherIterator.flatMap(unpackBlock)

AfterbasicBlockFetcherIteratorhas received the task of data retrieving, it produces severalfetchRequests. *_Each of them contains the tasks to fetchFileSegments from several nodes. *_According to the diagram above, we know thatreducer-2needs to fetchFileSegment(FS)(in white) from 3 worker nodes. The global data fetching task can be represented byblockByAddress: 4 blocks from node 1, 3 blocks from node 2, and 4 blocks from node 3

In order to accelerate data fetching process, it makes sense to divide the global tasks into sub tasks(fetchRequest), then every task takes a thread to fetch data. Spark launches 5 parallel threads for each reducer (the same as Hadoop). Since the fetched data will be buffered into memory, one fetch is not able to take too much data (no more thanspark.reducer.maxMbInFlight=48MB).Note that48MBis shared by the 5 fetch threads,so each sub task should take no more than48MB / 5 = 9.6MB. In the diagram, on node 1, we havesize(FS0-2) + size(FS1-2) < 9.6MB, but size(FS0-2) + size(FS1-2) + size(FS2-2) > 9.6MB, so we should break betweent1-r2andt2-r2. As a result, we can see 2fetchRequests fetching data from node 1.Will there befetchRequestlarger than 9.6MB?The answer is yes. If oneFileSegmentis too large, it still needs to be fetched in one shot. Besides, if reducer needs someFileSegments already existing on the local, it will do local read. At the end of shuffle read, it will deserialize fetchedFileSegmentand offer record iterators toRDD.compute()

In
basicBlockFetcherIterator
:


//
 generate the fetch requests
=
>
 basicBlockFetcherIterator.initialize()

=
>
 remoteRequests 
=
 splitLocalRemoteBlocks()

=
>
 fetchRequests 
++
=
Utils
.randomize(remoteRequests)


//
 fetch remote blocks
=
>
 sendRequest(fetchRequests.dequeue()) until 
Size
(fetchRequests) 
>
 maxBytesInFlight

=
>
 blockManager.connectionManager.sendMessageReliably(cmId, 
       blockMessageArray.toBufferMessage)

=
>
 fetchResults.put(
new
FetchResult
(blockId, sizeMap(blockId)))

=
>
 dataDeserialize(blockId, blockMessage.getData, serializer)


//
 fetch local blocks
=
>
 getLocalBlocks() 

=
>
 fetchResults.put(
new
FetchResult
(id, 
0
, () 
=
>
 iter))

Some more details:

How does the reducer sendfetchRequestto the target node? How does the target node processfetchRequest, read and send backFileSegmentto reducer?

WhenRDD.iterator()meetsShuffleDependency,BasicBlockFetcherIteratorwill be called to fetchFileSegments.BasicBlockFetcherIteratorusesconnectionManagerofblockMangerto sendfetchRequesttoconnectionManagers on the other nodes. NIO is used for communication betweenconnectionManagers. On the other nodes, for example, afterconnectionManageron worker node 2 receives a message, it will forward the message toblockManager. The latter usesdiskStoreto readFileSegments requested byfetchRequestlocally, they will still be sent back byconnectionManager. IfFileConsolidationis activated,diskStoreneeds the location ofblockIdgiven byshuffleBolockManager. IfFileSegmentis no more thanspark.storage.memoryMapThreshold = 8KB, then diskStore will putFileSegmentinto memory when reading it, otherwise, The memory mapping method inFileChannelofRandomAccessFilewill be used to readFileSegment, thus largeFileSegmentcan be loaded into memory.

WhenBasicBlockFetcherIteratorreceives serializedFileSegmentsfrom the other nodes, it will deserialize and put them infetchResults.Queue. You may notice thatfetchResults.Queueis similar tosoftBufferinShuffle detialschapter.If theFileSegments needed byBasicBlockFetcherIteratorare local, they will be found locally bydiskStore, and put infetchResults. Finally, reducer reads the records fromFileSegmentand processes them.

After
 the blockManager receives the fetch request


=
>
 connectionManager.receiveMessage(bufferMessage)

=
>
 handleMessage(connectionManagerId, message, connection)


//
 invoke blockManagerWorker to read the block (FileSegment)
=
>
 blockManagerWorker.onBlockMessageReceive()

=
>
 blockManagerWorker.processBlockMessage(blockMessage)

=
>
 buffer 
=
 blockManager.getLocalBytes(blockId)

=
>
 buffer 
=
 diskStore.getBytes(blockId)

=
>
 fileSegment 
=
 diskManager.getBlockLocation(blockId)

=
>
 shuffleManager.getBlockLocation()

=
>
if
(fileSegment 
<
 minMemoryMapBytes)
     buffer 
=
ByteBuffer
.allocate(fileSegment)

else

     channel.map(
MapMode
.
READ_ONLY
, segment.offset, segment.length)

Every reducer has aBasicBlockFetcherIterator, and oneBasicBlockFetcherIteratorcould, in theory, hold 48MB offetchResults. As soon as oneFileSegmentinfetchResultsis read off, someFileSegments will be fetched to fill that 48MB.

BasicBlockFetcherIterator
.next()

=
>
 result 
=
 results.task()

=
>
while
 (
!
fetchRequests.isEmpty 
&
&

        (bytesInFlight 
==
0
||
 bytesInFlight 
+
 fetchRequests.front.size 
<
=
 maxBytesInFlight)) {
        sendRequest(fetchRequests.dequeue())
      }

=
>
 result.deserialize()

Discussion

In terms of architecture design, functionalities and modules are pretty independent.BlockManageris well designed, but it seems to manage too many things (data block, memory, disk and network communication)

This chapter discussed how the modules of spark system are coordinated to finish a job (production, submission, execution, results collection, results computation and shuffle). A lot of code is pasted, many diagrams are drawn. More details can be found in source code, if you want.

If you also want to know more aboutblockManager, please refer to Jerry Shao'sblog(in Chinese).

results matching ""

    No results matching ""